Problem:

SQL jest prosty. Funkcje okienkowe są proste, intuicyjne i przychodzi bez komplikacji. Natomiast w Spark? Jak wytłumaczyć koledze w zespole w jaki sposób tworzyć funkcję okienkowe w Spark? Od czego zacząć? Jakie są problemy i wyzwania w tworzeniu funkcji okienkowych? Jakie są różnic w porównaniu z SQL? Co jest łatwiejsze w utrzymaniu?

Rozwiązanie:

Na szybko można odpowiedzieć w ten sposób, że funkcja okienkowa tworzona jest w oderwaniu od data setu ale aplikowania na jego podzbiorze. Na przykład definicja wygląda w ten sposób:

user_window = Window.partitionBy("user_id").orderBy("event_time")

Jak to rozumieć? Dla każdego użytkownika okienko będzie posortowane po even_time i najstarsze dane będą pojawiały się na jako pierwsze a najświeższe dane wylądują w tym zbiorze danych jako ostatnie.

Potem wystarczy już tylko użyć funkcji okienkowej.

Ale zacznijmy po kolei i na przykładzie.

Funkcje okienkowe: Po co?

Po co są właściwie funkcje okienkowe? W jakich przypadkach się je stosuje?
Znasz zapewne funkcje grupujące? One działają na całym zbiorze dostępnych danych. Na całej tabeli lub dataframie.

Co jednak w przypadku, gdy w ramach tabeli chcesz przeliczyć sumy dla mniejszych podzbiorów. Klasyczny przykład to zliczenie sumy zarobków pracowników w departamencie.

Funkcje okienkowe są stworzone, do operacji które skupiają się na wycinku danych w ramach dużego zbioru.

Przykład

Dane na których będziemy pracować to tabela z użytkownikami odwiedzającymi witrynę internetową. Będziemy mieli dostępne identyfikator użytkownika, czas zaistnienia wydarzenia, typ wydarzenia i nazwa podstrony, która została odwiedzona.

Mniej więcej coś takiego:

create or replace table next_level_dm.d_user_sessions as 
select 'USER001' as user_id,'2025-04-10 08:05:12' as event_time,'page_view' event_type,'/home' as page union all
select 'USER001','2025-04-10 08:06:45','page_view','/products' union all
select 'USER001','2025-04-10 08:09:11','click','/products/shoes' union all
select 'USER001','2025-04-10 08:45:23','page_view','/cart' union all
select 'USER001','2025-04-10 09:20:10','page_view','/home' union all
select 'USER001','2025-04-10 09:22:05','click','/checkout' union all
select 'USER001','2025-04-10 09:25:47','purchase','/thank-you' union all
select 'USER002','2025-04-10 10:15:30','page_view','/blog'

Zadanie do wykonania

Dla każdego użytkownika przypisz numer sesji. Nowa sesja zaczyna się, gdy czas między jednym a kolejnym odwiedzeniem strony jest większy niż 30 minut.

Zacznijmy od definiowania okna, czyli podzbioru danych, na których będziemy operować:

user_window = Window.partitionBy("user_id").orderBy("event_time")

Co tutaj się dzieje, zdefiniowaliśmy okno per użytkownik. Czyli wszystkie funkcje okienkowe będą wykonywane w tylko dla tak ograniczonego podzbioru danych w naszym przypadku w ramach użytkownika. Co jest też istotne ustaliliśmy sortowanie po czasie eventu. Czyli wydarzenia będą wyświetlane od najstarszego do najmłodszego.

Jak dobrze zauważyłeś, okno jest stworzone w oderwaniu od datasetu. Dopiero potem aplikowane jest dla konkretnej funkcji okienkowej w ramach datasetu.

Użycie funkcji okienkowej

W celu policzenia, czy od ostatniego eventu minęło więcej niż 30 minut, musimy znać czas wystąpienia poprzedniego eventu. Do tego celu użyjemy funkcji okienkowej lag - pobierającej poprzedni element.

df_prev_event = user_sessions.withColumn("prev_event_time_per_user", lag("event_time").over(user_window))) 

Funkcja okienkowa lag pobiera poprzedni event_type dla okna zdefiniowanego w user_window. Czyli dla pierwszego rekordu będzie to wartość pusta (null) gdyż tam nie było jeszcze poprzedniego rekordu. Po dodaniu kolumny będziemy mieli taki zbiór danych:

spark window function

Rozwiązujemy zadanie

W celu rozwiązania zadanie będziemy potrzebowali jeszcze.

  1. Policzyć ile czasu w minutach upłynęło pomiędzy wejściami na kolejne strony
  2. Sprawdzić czy różnica czasu jest większa od 30 minut, jeżeli tak ustawić flagę.
  3. Nadać kolejne numery sesjom użytkownika

Ad1:

withColumn(
        "time_diff_minutes_per_user",
        (unix_timestamp("event_time") - unix_timestamp("prev_event_time_per_user")) / 60

Pobranie poprzedniego event time dla użytkownika, jeżeli poprzedniego nie było wtedy zwracana jest wartość null.

spark window function

Ad2:

withColumn("is_new_session", when(col("prev_event_time_per_user").isNull(), lit(1))
                .when(col("time_diff_minutes_per_user") > 30, lit(1))
                .otherwise(lit(0)))

Flaga wyliczona na podstawie wcześniej różnicy czasu.

spark window function

Ad3.

withColumn("session_id", sum("is_new_session").over(user_window)

Tutaj znowu wykorzystujemy funkcję okienkową do zsumowania flag is_new_session. Po uruchomieniu tej funkcji dostaniemy taki wynik:

spark window function

Spark kontra SQL

W Sparku okno definiujesz raz i raz przypisujesz do funkcji okienkowej. To jest zdecydowana zaleta, gdyż przy potencjalnych modyfikacjach zmiany okna, czyli albo zawężenia albo rozszerzenia lub zmianie warunków filtrowania, dokonujesz w je w jednym miejscu i nie musisz zmieniać kodu w wielu miejscach.

Takie definiowanie okna i wyzwalanie go w Sparku jest też intuicyjne. Różnicami jest to, że wystarczy okno zdefiniować raz i definiujemy je w oderwaniu od datasetu. Więc można tą samą funkcję okienkową używać w wielu datasetach.